package com.aotain.nyx.port;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import com.aotain.nyx.ipstat.IPStatTuple;


public class PortAbnReduce  implements ReduceFunction<Tuple2<String,IPStatTuple>> {

	/**
	 *
	 */
	    
	private static final long serialVersionUID = 8998458657921056729L;

	@Override
	public Tuple2<String, IPStatTuple> reduce(
			Tuple2<String, IPStatTuple> value1,
			Tuple2<String, IPStatTuple> value2) throws Exception {
		
		Tuple2<String, IPStatTuple> ret = new Tuple2<String, IPStatTuple>();
		IPStatTuple tuple = new IPStatTuple();
		tuple.setDestIP(value1.f1.getDestIP());
		tuple.setDestPort(value1.f1.getDestPort());
		tuple.setSessionNum(value1.f1.getSessionNum() + value2.f1.getSessionNum());
		
		ret.setFields(value1.f0, tuple);
		return ret;
		    
	}

}

    